跳到主要内容

gRPC 的四种服务类型

前言

入职了这么久,都没有抽出时间学习 gRPC 相关的知识,以至于我都忘记这也是一整套技术栈,果然是 “鱼在水里游,却忘了有水;鸟乘着风飞,却不知有风” 看不清自己所处的环境,也看不清自己的本质,浑浑噩噩地活着,着实是一种悲哀。

言归正传,首先是理解 gRPC 是什么?它相比传统的 RPC 方式有什么区别(Restful、TCP 等等)?

什么是 gRPC

RPC 框架基本都是直接基于 TCP 协议自研数据结构和编解码方式,但是 gRPC 却完全不是这样,它使用 HTTP/2 协议来传输数据(主要是 Google 在推广 HTTP/2,且基于 HTTP/2 的一些新特性也会让实现方案上少写很多代码)。

既然底层使用 HTTP/2,那为啥还要用 RPC,不直接用 Restful 的方式更直接吗。RPC 通常使用二进制编码来压缩消息的内容,Restful 更多的使用 JSON 格式,消息体中的冗余数据比较多,性能不如 RPC。

gRPC 默认使用 protocol buffers,这是 Google 开源的一套成熟的结构数据序列化机制(当然也可以使用其他数据格式如 JSON)。

服务类型的定义

服务主要分四种,如下注释所示: 1、单项 RPC(最常用的类型) 2、服务端流式 RPC 3、客户端流式 RPC 4、双向流式 RPC

各种服务使用例

具体如下注释所示:

syntax = "proto3"; // 指定proto版本

package hello; // 指定默认包名

service HelloService {

// 单项 RPC,即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。
rpc SayHello (HelloRequest) returns (HelloResponse);

// 服务端流式 RPC,即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。
// 客户端从返回的数据流里一直读取直到没有更多消息为止。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

// 客户端流式 RPC,即客户端用提供的一个数据流写入并发送一系列消息给服务端。
// 一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

// 双向流式 RPC,即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,
// 所以客户端和服务端能按其希望的任意顺序读写,
// 例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,
// 或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
}

message HelloRequest {
string greeting = 1;
}

message HelloResponse {
string reply = 1;
}

生成对应的接口

protoc -I ./protos --go_out=plugins=grpc:./hello_proto --go_opt=paths=source_relative protos/*.proto

编写服务端代码 server.go 并启动

import (
"context"
"fmt"
"google.golang.org/grpc"
"io"
"log"
"net"
"stgrpc/hello_proto"
)

var _ hello_proto.HelloServiceServer = (*helloServer)(nil)

type helloServer struct{}

// SayHello 单项流式 :单个请求,单个响应
func (h *helloServer) SayHello(ctx context.Context, request *hello_proto.HelloRequest) (*hello_proto.HelloResponse, error) {
greeting := request.GetGreeting()
log.Println("1. 收到问候:", greeting)
return &hello_proto.HelloResponse{
Reply: "你好,我收到了请求",
}, nil
}

// LotsOfReplies 服务端流式 :单个请求,集合响应
func (h *helloServer) LotsOfReplies(request *hello_proto.HelloRequest, stream hello_proto.HelloService_LotsOfRepliesServer) error {
greeting := request.GetGreeting()
log.Println("2. 收到问候:", greeting)

for i := 0; i < 10; i++ {
stream.Send(&hello_proto.HelloResponse{
Reply: fmt.Sprintf("这是响应:%d", i),
})
}

return nil
}

// LotsOfGreetings 客户端流式 :集合请求,单个响应
func (h *helloServer) LotsOfGreetings(reqStream hello_proto.HelloService_LotsOfGreetingsServer) error {
addVal := ""
for {
// 一次接受一条记录
singleRequest, err := reqStream.Recv()
// 不等于io.EOF表示这是条有效记录
if err == io.EOF {
log.Println("3. 客户端发送完毕")
break
} else if err != nil {
log.Fatalln("3. 接收时发生异常", err)
break
} else {
log.Println("3. 收到请求:", singleRequest.GetGreeting())
// 收完之后,执行SendAndClose返回数据并结束本次调用
addVal += singleRequest.GetGreeting()
}
}

return reqStream.SendAndClose(&hello_proto.HelloResponse{
Reply: "响应消息:" + addVal,
})
}

// BidiHello 双向流式 :集合请求,集合响应
func (h *helloServer) BidiHello(reqStream hello_proto.HelloService_BidiHelloServer) error {
for {
singleRequest, err := reqStream.Recv()
// 不等于 io.EOS 表示这是条有效记录
if err == io.EOF {
log.Println("4. 接收完毕")
return nil
} else if err != nil {
log.Fatalln("4. 接收时发生异常", err)
return err
} else {
log.Println("4. 接收到数据", singleRequest.GetGreeting())
if sendErr := reqStream.Send(&hello_proto.HelloResponse{Reply: "响应消息:" + singleRequest.GetGreeting()}); sendErr != nil {
log.Println("4. 返回数据异常数据", sendErr)
return sendErr
}
}
}
}

func main() {
// 要监听的协议和端口
lis, err := net.Listen("tcp", ":45678")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

// 实例化gRPC server结构体
s := grpc.NewServer()

// 服务注册
hello_proto.RegisterHelloServiceServer(s, &helloServer{})

log.Println("开始监听,等待远程调用...")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

客户端

具体的 客户端代码 就不贴了,总之创建连接的核心代码如下:

// 远程连接服务端
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
// main方法执行完毕后关闭远程连接
defer conn.Close()
// 实例化数据结构
client := hello_proto.NewHelloServiceClient(conn)
// 超时设置
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

不过为了方便调试,这里直接使用 gRPC 客户端 bloomrpc 来发送请求了

20220526095053